#ifndef __CLUSTER_H__
#define __CLUSTER_H__

#include <stdio.h>

#include "ynet_net.h"
#include "job_dock.h"
#include "adt.h"
#include "etcd.h"
#include "sysy_conf.h"
#include "net_global.h"
#include "vec.h"
#include "diskmd.h"

#define __NEWDISK_BALANCE__    0x00000001
#define __NEWDISK_CONTROLLER__ 0x00000002
#define __NEWDISK_FORCE__      0x00000004

#define NODE_STATUS_MAX 16

#define NODE_STATUS_UNKNOW "offline"
#define NODE_STATUS_NONE "none"
#define NODE_STATUS_META "meta"
#define NODE_STATUS_NORMAL "normal"
#define NODE_STATUS_ADMIN "admin"

#define STORAGE_AREA_KEY        LICH_SYSTEM_ATTR_STORAGE_AREA
#define STORAGE_AREA_VALUE_NULL LICH_SYSTEM_ATTR_NULL

#define NODE_TYPE NODE_STATUS_UNKNOW, NODE_STATUS_NONE, NODE_STATUS_NORMAL, NODE_STATUS_META, NODE_STATUS_ADMIN

typedef enum {
        __NODE_UNKNOW__,
        __NODE_NONE__,
        __NODE_NORMAL__,
        __NODE_META__,
        __NODE_ADMIN__,
} node_type_t;

static inline const char *node_type(node_type_t type)
{
        static char *node_type[] = {NODE_TYPE};

        if (type > __NODE_ADMIN__) {
                YASSERT(0);
        }

        return node_type[type];
}

static inline int node_type1(const char *name)
{
        int i;
        static char *node_type[] = {NODE_TYPE};

        for (i = __NODE_UNKNOW__; i <= __NODE_ADMIN__; i++) {
                if (strcmp(name, node_type[i]) == 0)
                        return i;
        }

        YASSERT(0);

        return -1;
}
//int node_type1(const char *name);

#define QUORUM_TIMEOUT 60

#define __NODE_STAT_WRITEABLE__  0x00000001
#define __NODE_STAT_DELETING__   0x00000002
#define __NODE_STAT_READY__      0x00000004
#define __NODE_STAT_OFFLINE__    0x00000008

static inline int node_stat_writeable(uint32_t status)
{
        if (status == 0)
                return 0;

        if ((status & __NODE_STAT_WRITEABLE__)
            &&  !(status & __NODE_STAT_DELETING__))
                return 1;
        else
                return 0;
}

typedef struct {
        nid_t nid;
        //uint32_t writeable; /*local fs can be write by cluster*/
        //uint32_t ready; /*connect to cluster, service is ready*/
        uint32_t type;
        uint32_t status;
        uint64_t load;
        int pool_count;
} nodestat_t;

typedef struct {
        nodestat_t *stat;
        char *dfinfo;
        char *clustername;
        char *nodename;
        char *status;
        char *admin;
        //char *quorum;
        char *home;
        uint32_t *uptime;
} nodeinfo_t;

typedef struct {
        uint32_t disk_total;
        uint32_t disk_online;
        uint32_t node_total;
        uint32_t node_online;
        uint32_t rack_total;
        uint32_t rack_online;
        uint32_t site_total;
        uint32_t site_online;
} lich_stat_t;

static inline int lich_stat_half(const lich_stat_t *lich_stat)
{
        if (lich_stat->site_online * 2 == lich_stat->site_total)
                return 1;
        else if (lich_stat->rack_online * 2 == lich_stat->rack_total)
                return 1;
        else if (lich_stat->node_online * 2 == lich_stat->node_total)
                return 1;
        else
                return 0;
}

static inline void dfinfo_encode(char *buf, uint32_t *count, nodedfree_t *dfree)
{
        int i;
        char info[MAX_INFO_LEN];

        /* format: pool_count:pool1,pool1_used,pool1_total;... */

        sprintf(buf, "%d:", dfree->pool_count);

        for (i = 0; i < dfree->pool_count; i++) {
                DBUG("%d/%d pool %s used %ju total %ju\n", i, dfree->pool_count,
                     dfree->pool_stat[i].name,
                     dfree->pool_stat[i].used,
                     dfree->pool_stat[i].total);

                sprintf(info, "%s,%lx,%lx;",
                        dfree->pool_stat[i].name, dfree->pool_stat[i].used, dfree->pool_stat[i].total);

                if (strlen(buf) + strlen(info) + 1 > (uint32_t)*count)
                        break;

                strcat(buf, info);
        }

        *count = strlen(buf) + 1;
}

static inline void dfinfo_decode(const char *buf, int count, nodedfree_t *dfree)
{
        int ret, i;
        char info[MAX_BUF_LEN], *cur, *tmp;

        (void) count;

        strcpy(info, buf);
        cur = info;
        tmp = strchr(cur, ':');
        if (!tmp) {
               dfree->pool_count = 0;
               goto out;
        }

        *tmp = '\0';
        dfree->pool_count = atoi(info);

        for (i = 0; i < dfree->pool_count; i++) {
                cur = tmp + 1;
                tmp = strchr(cur, ',');
                *tmp = '\0';

                strcpy(dfree->pool_stat[i].name, cur);

                cur = tmp + 1;
                tmp = strchr(cur, ',');
                *tmp = '\0';

                ret = sscanf(cur, "%lx", &dfree->pool_stat[i].used);
                YASSERT(ret == 1);

                cur = tmp + 1;
                tmp = strchr(cur, ';');
                *tmp = '\0';

                ret = sscanf(cur, "%lx", &dfree->pool_stat[i].total);
                YASSERT(ret == 1);

                DBUG("%d pool %s used %ju total %ju\n",
                     i,
                     dfree->pool_stat[i].name,
                     dfree->pool_stat[i].used,
                     dfree->pool_stat[i].total);
        }

out:
        return;
}

static inline void dfree_count(nodedfree_t *dfree, uint64_t *used, uint64_t *total)
{
        int i;

        *used = 0; *total = 0;
        for (i = 0; i < dfree->pool_count; i++) {
                *used += dfree->pool_stat[i].used;
                *total += dfree->pool_stat[i].total;
        }
}

static inline void dfree_count2(nodedfree_t *dfree, const char *pool, uint64_t *used, uint64_t *total)
{
        int i;
        poolstat_t *pstat;

        *used = 0;
        *total = 0;
        for (i = 0; i < dfree->pool_count; i++) {
                pstat = &dfree->pool_stat[i];
                if (strcmp(pstat->name, pool) == 0) {
                        *used += pstat->used;
                        *total += pstat->total;
                }
        }
}

static inline void nodeinfo_encode(char *buf, uint32_t *count, const nodeinfo_t *node)
{

        _opaque_encode(buf, count,
                       node->nodename, strlen(node->nodename) + 1,
                       node->clustername, strlen(node->clustername) + 1,
                       node->status, strlen(node->status) + 1,
                       node->home, strlen(node->home) + 1,
                       node->stat, sizeof(nodestat_t),
                       node->dfinfo, strlen(node->dfinfo) + 1,
                       node->uptime, sizeof(uint32_t),
                       node->admin, strlen(node->admin) + 1,
                       NULL);
}

static inline void nodeinfo_decode(const char *buf, int count, nodeinfo_t *node)
{
        YASSERT(count + sizeof(*node) < BUF_LEN);
        _opaque_decode(buf, count,
                       &node->nodename, NULL,
                       &node->clustername, NULL,
                       &node->status, NULL,
                       &node->home, NULL,
                       &node->stat, NULL,
                       &node->dfinfo, NULL,
                       &node->uptime, NULL,
                       &node->admin, NULL,
                       NULL);
}

int node_init(const char *home);
int node_remove(const char *admin, const char *name);
int node_shutdown(void);

int node_set_variable(const char *name, const char *key, const char *value);
int node_fetch_variable(const char *name, const char *key, char *value);

int node_getinfo(nodeinfo_t *info, const char *name, char *buf);
int node_gethost(const char *admin, const nid_t *nid, char *host);

int node_create_workdir(const char *home, const char *namespace);

int node_castoff(const char *name, int idx);
int node_pooldrop(const char *name, const char *pool);
int node_get_nid(const char *home);

/*from mq_master.c*/
int mq_master_queue(uint32_t op, const void *buf, int buflen,
                    int count);

/*from env.c*/
int env_prep(const char *home, int daemon);

// called by lichd
int env_init(const char *home);

// called by tools
int env_init_simple(const char *name);

int env_init_common(int);
int env_update_status(const char *status, int step);
int env_server_run(int daemon, int (*server)(void *), void *args);

/*from cluster.c*/
int cluster_dropnode(const char **list, int count);

int cluster_movemeta(const char **list, int count);

int cluster_countnode(int *count);
int cluster_is_solomode();
int cluster_countnode_writeable(int *_count);

int cluster_listnode_open(const char *uuid);
int cluster_listnode(char *buf, int *buflen, const char *uuid, uint64_t offset);
int cluster_listnode_close(const char *uuid);

int cluster_listnode_iterator(func_int1_t func, void *arg);
int cluster_listnode_iterator1(func_int1_t func, void *arg);

/* cluster node list */

typedef struct {
        char tmp[MAX_BUF_LEN];
        nodeinfo_t nodeinfo;
} cluster_node_t;

typedef vec_t(cluster_node_t *) vec_node_t;

static inline int cluster_node_malloc(cluster_node_t **node)
{
        int ret;
        cluster_node_t *_node;

        *node = NULL;

        ret = ymalloc((void **)&_node, sizeof(cluster_node_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //_node->nodeinfo = (void *)_node->_buf;

        *node = _node;
        return 0;
err_ret:
        return ret;
}

static inline int cluster_node_free(cluster_node_t **node)
{
        if (*node != NULL) {
                yfree((void **)node);
        }
        return 0;
}

int cluster_get_nodes(vec_node_t *nodes, int writeable);

int cluster_ready(void);
int cluster_ping(const nid_t *nid, int timeout);
int cluster_analysis(const nid_t *nid, const str_t *key, str_t *value);
int cluster_getinfo(const nid_t *nid, const nid_t *peer, ynet_net_info_t *info);
int cluster_online(const nid_t *nid);
int cluster_set_deleting(int deleting);
int cluster_storage_area_is_null(const char *site_name);

/*from dispatch.c*/

typedef struct {
        char status[128];
        char task[0];
} scan_status_t;

typedef enum {
        LOCAL_VOL_SYNC,
        LOCAL_VOL_ADD,
        LOCAL_VOL_DEL,
        LOCAL_VOL_MOVE,
} local_vol_opt_t;

typedef struct {
        uint64_t volcount;
        chkid_t vols[0];
} local_vol_t;

#define LOCAL_VOL_SIZE (sizeof(local_vol_t) + sizeof(chkid_t))

int dispatch_sysstat(lich_stat_t *stat, int force);
int dispatch_pooldump(const char *pool);

int dispatch_addnode(const char *name, const nid_t *nid);
int dispatch_delnode(const char *name);

int dispatch_writeable(const nid_t *diskid, int count);
int dispatch_heartbeat(const char *name, const char *dfinfo, const nodestat_t *stat,
                       const char *status, uint32_t *admin_uptime);

int dispatch_vol_notifiction(const char *name, const local_vol_t *vols, uint32_t options);
int dispatch_netinfo(ynet_net_info_t *info, const nid_t *nid);

int dispatch_check_storage_area(const char *site_name);
int dispatch_list_storage_area(char *buf, int *count);

int dispatch_newid(chkid_t *chkid);

int dispatch_newdisk(nid_t *_nid, int *_repnum, int repmin, const char *pool,
                     const nid_t *skip, int skip_count, int flag);
int dispatch_newdisk2(const char *pool, nid_t *_nid, const nid_t *skip,
                      int skip_count);

/*from maping.c*/

#define HOST2NID    "network/host2nid"
#define NID2NETINFO "network/nid2netinfo"
#define NAME2NID    "storage/name2nid"
#define ID2NID      "storage/id2nid"
#define MAPING_MISC "misc"

int maping_init(void);
int maping_nid2netinfo(const nid_t *nid, ynet_net_info_t *info);
int maping_cleanup(const char *type);
int maping_host2nid(const char *hostname, nid_t *nid);
int maping_addr2nid(const char *addr, nid_t *nid);
int maping_nid2host(const nid_t *nid, char *hostname);
int maping_getmaster(char *hostname, int force);
int maping_set(const char *type, const char *_key, const char *value);
int maping_get(const char *type, const char *_key, char *value, time_t *ctime);
int maping_drop(const char *type, const char *_key);

/*from paxos.c*/
int paxos_dump(const char *path);

/*network.c*/
int network_init(void);

int network_connect_master(void);
int network_connect(const nid_t *nid, time_t *_ltime, int _timeout, int force);
int network_connect_wait(const nid_t *nid, time_t *_ltime, int _timeout, int force);
int network_connect1(const nid_t *nid);
int network_connect_byname(const char *name, nid_t *nid);

void network_ltime_reset(const nid_t *nid, time_t ltime, const char *why);
int network_ltime(const nid_t *nid, time_t *ltime);
time_t network_ltime1(const nid_t *nid);

const char *network_rname(const nid_t *nid);
int network_rname1(const nid_t *nid, char *name);

void network_close(const nid_t *nid, const char *why, const time_t *ltime);


static inline void name2rack(const char *name, char *rack)
{
        char _rack[MAX_NAME_LEN], _node[MAX_NAME_LEN], _disk[MAX_NAME_LEN],
                _site[MAX_NAME_LEN];

        hosts_split(name, _site, _rack, _node, _disk);
        snprintf(rack, MAX_BUF_LEN, "%s.%s", _site, _rack);
}

static inline int nid2rack(const nid_t *nid, char *rack)
{
        int ret;
        char name[MAX_NAME_LEN];

        network_connect(nid, NULL, 1, 0);

        ret = network_rname1(nid, name);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
                GOTO(err_ret, ret);
        }

        name2rack(name, rack);

        return 0;
err_ret:
        return ret;
}

static inline int __rack_count(char rack_array[][MAX_NAME_LEN], int *_count, const char *rack)
{
        int ret, count, i;

        count = *_count;
        
        for(i = 0; i < count; i++) {
                DBUG("array[%u] %s --> %s\n", i, rack_array[i], rack);
                
                if (strcmp(rack_array[i], rack) == 0) {
                        ret = EEXIST;
                        goto err_ret;
                }
        }

        strcpy(rack_array[i], rack);
        *_count = count + 1;
        
        return 0;
err_ret:
        return ret;
}

static inline int nids2racks(char rack_array[][MAX_NAME_LEN], int *_count, const nid_t *nids, int count)
{
        int ret, rack_count, i;
        char name[MAX_NAME_LEN], rack[MAX_NAME_LEN];
        
        rack_count = 0;
        for(i = 0; i < count; i++) {
                network_connect(&nids[i], NULL, 1, 0);

                ret = network_rname1(&nids[i], name);
                if (unlikely(ret)) {
                        UNIMPLEMENTED(__DUMP__);
                        GOTO(err_ret, ret);
                }

                name2rack(name, rack);
                
                ret = __rack_count(rack_array, &rack_count, rack);
                if (ret) {
                        if (ret == EEXIST) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        *_count = rack_count;
        
        return 0;
err_ret:
        return ret;
}

static inline int rack_count(int *count)
{
        int ret, i, rack_count = 0;
        static int __count__ = 0;
        static time_t last_update;
        time_t now = gettime();
        char rack_array[LICH_REPLICA_MAX][MAX_NAME_LEN];
        char rack[MAX_NAME_LEN];
        etcd_node_t *list = NULL, *node;

        *count = 0;
        if (now - last_update < 60) {
                *count = __count__;
                goto out;
        }

        ret = etcd_list(ETCD_NODE, &list);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        for(i = 0; i < list->num_node; i++) {
                node = list->nodes[i];

                name2rack(node->key, rack);
                DBUG("node %s rack %s\n", node->key, rack);

                ret = __rack_count(rack_array, &rack_count, rack);
                if (ret) {
                        if (ret == EEXIST) {
                                continue;
                        } else
                                GOTO(err_free, ret);
                }
                
                
        }

        free_etcd_node(list);
        *count = rack_count;
        last_update = now;

out:
        return 0;
err_free:
        free_etcd_node(list);
err_ret:
        return ret;
}

static inline int rack_check(const nid_t *nid, int count)
{
        int ret, _rack_count, total;
        char rack_array[LICH_REPLICA_MAX][MAX_NAME_LEN];

        ret = rack_count(&total);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (total <= 1) {
                goto out;
        }
        
        ret = nids2racks(rack_array, &_rack_count, nid, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(count == _rack_count);
        //YASSERT(rack_count >= 2);

out:
        return 0;
err_ret:
        return ret;
}

#endif
